package my.demo;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TSocket {

	Logger logger = LoggerFactory.getLogger(this.getClass());

	InputStream input;
	OutputStream output;

	String host;
	int port;
	int timeout;

	Socket socket;

	public TSocket(String host, int port) {
		this(host, port, 0);
	}

	public TSocket(String host, int port, int timeout) {
		this.host = host;
		this.port = port;
		this.timeout = timeout;
		init();
	}

	private void init() {
		socket = new Socket();

		try {
			socket.setSoLinger(false, 0);
			socket.setTcpNoDelay(true);
			socket.setSoTimeout(timeout);
			socket.setKeepAlive(true);
		} catch (SocketException e) {
			logger.error("error", e);
		}

	}

	public void open() throws IOException {
		try {
			socket.connect(new InetSocketAddress(host, port),
					timeout == 0 ? 30 * 1000 : timeout);
			input = new BufferedInputStream(socket.getInputStream(), 1024);
			output = new BufferedOutputStream(socket.getOutputStream(), 1024);
		} catch (IOException iox) {
			close();
			throw iox;
		}
	}

	public void startReceive() {
		Thread t = new Thread(new Runnable() {

			@Override
			public void run() {
				while (true) {
					logger.info("start read msg");
					try {
						Message msg = read();
						if ("tips".equals(msg.getCmd())) {
							if (listener != null) {
								listener.onpush(msg);
							}
						}

						logger.info("end read msg");
					} catch (Exception e) {
						logger.error("error", e);
						close();
						listener.onerror();
						break;
					}
				}
			}
		});

		t.start();

	}

	public boolean isOpen() {
		if (socket == null) {
			return false;
		}
		return socket.isConnected();
	}

	public void close() {
		if (socket != null&&socket.isConnected()) {
			try {
				socket.close();
				socket = null;
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}

	public void write(Message message) throws IOException {
		String json = message.toJson();
		byte[] bytes = null;
		try {
			bytes = json.getBytes("UTF-8");
		} catch (UnsupportedEncodingException e) {
			e.printStackTrace();
		}
		int length = bytes.length;

		logger.debug("length:{}", length);
		output.write(encodeFrameSize(length));
		output.write(bytes);
		output.flush();
	}

	public Message read() throws IOException {
		byte[] buf = new byte[4];
		readAll(buf);
		int size = decodeFrameSize(buf);
		logger.debug("size:{}", size);
		byte[] content = new byte[size];
		readAll(content);
		String json = new String(content, "UTF-8");
		return Message.build(json);
	}

	public void readAll(byte[] buf) throws IOException {

		int i = 0;
		while (i < buf.length) {
			int r = input.read(buf, i, buf.length - i);

			if (r < 0) {
				throw new IOException();
			}
			i = i + r;
		}
	}

	public static final byte[] encodeFrameSize(final int frameSize) {
		byte[] buf = new byte[4];
		buf[0] = (byte) (0xff & (frameSize >> 24));
		buf[1] = (byte) (0xff & (frameSize >> 16));
		buf[2] = (byte) (0xff & (frameSize >> 8));
		buf[3] = (byte) (0xff & (frameSize));

		return buf;
	}

	public static final int decodeFrameSize(final byte[] buf) {
		return ((buf[0] & 0xff) << 24) | ((buf[1] & 0xff) << 16)
				| ((buf[2] & 0xff) << 8) | ((buf[3] & 0xff));
	}

	private Listener listener;

	public Listener getListener() {
		return listener;
	}

	public void setListener(Listener listener) {
		this.listener = listener;
	}

	public static interface Listener {
		public void onpush(Message msg);

		public void onerror();
	}
}
